package flinkdemo.kafak;

import com.alibaba.fastjson.JSON;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.nio.charset.StandardCharsets;

/**
 * @author Administrator
 */
public class ProductSerializationSchema<T> implements KafkaSerializationSchema<T> {

    private String topic;

    public ProductSerializationSchema(String topic) {
        super();
        this.topic = topic;
    }

    @Override
    public ProducerRecord<byte[], byte[]> serialize(T element, Long timestamp) {
        System.out.println("发送时间："+timestamp);
        return new ProducerRecord<byte[], byte[]>(topic, JSON.toJSONString(element).getBytes(StandardCharsets.UTF_8));
    }
}